跳到主要内容

Java 使用 Zookeeper 实现分布式锁

分布式锁设计思路

分布式锁有多种实现方式,比如通过数据库、redis 都可实现。作为分布式协同工具 ZooKeeper,当然也有着标准的实现方式。下面介绍在 Zookeeper 中如何实现排他锁。

  1. 每个客户端往 /Locks 下创建临时有序节点 /Locks/Lock_,创建成功后 /Locks 下面会有每个客户端对应的节点, 如 /Locks/Lock_00000001
  2. 客户端取得 /Locks 下子节点,并进行排序,判断排在最前面的是否为自己,如果自己的锁节点在第一位,代表获取锁成功
  3. 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点为 Lock_00000002,那么则监听 Lock_00000001
  4. 当前一位锁节点 Lock_0000001 对应的客户端执行完成,释放了锁,将会触发监听客户端 Lock_00000002 的逻辑
  5. 监听客户端重新执行第2步逻辑,判断自己是否获得了锁

基础连接

因为这个连接方式都大同小异,所以这里直接写在这里了,下面就不再重复这块代码

public class MyLock {
// zk连接对象
public static ZooKeeper zooKeeper;
// 计数器对象
private static CountDownLatch countDownLatch = new CountDownLatch(1);
// zk的连接串
private static String ip = "127.0.0.1:2181";
// 锁的节点名称
public static final String LOCK_ROOT_PATH = "/Locks";
public static final String LOCK_NODE_NAME = LOCK_ROOT_PATH + "/lock_";
private String lockPath;

private Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
synchronized (this) {
notifyAll();
}
}
}
};


/**
* 构造方法
*/
public MyLock() {
try {
// 打开zookeeper连接
zooKeeper = new ZooKeeper(ip, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
countDownLatch.countDown();
}
}
}
});
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}

创建节点

/**
* 创建锁节点
*/
private void createLock() {
try {
Stat exists = zooKeeper.exists(LOCK_ROOT_PATH, false);
// 判断/Locks是否存在,不存在则创建
if (exists == null) {
zooKeeper.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 创建临时有序节点,把这个保存到全局变量里面,下面还要用到
lockPath = zooKeeper.create(LOCK_NODE_NAME, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("节点创建成功:" + lockPath);
} catch (Exception e) {
e.printStackTrace();
}
}

尝试获取锁

/**
* 尝试获取锁
*/
private void attemptLock() {
try {
// 获取 /Locks 节点下的所有子节点
List<String> children = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
// 对子节点进行排序
Collections.sort(children);
// 获取上面创建的临时有序路径,例如这里的切割 /Locks/Lock_0000002 为 0000002(这里切割的目的是减少字符匹配次数),然后去查找 ZK 里面的的全部 Node
int index = children.indexOf(lockPath.substring((LOCK_NODE_NAME.length() - LOCK_ROOT_PATH.length()) + 1));
System.out.println(index);
// 如果创建的这个节点就是第一位,则表示取得了锁
if (index == 0) {
System.out.println("获取锁成功");
return;
} else {
// 如果没有找到,说明当前有服务在占用这个锁,则监听这个第一个 Node(/Locks/Lock_0000001)
String path = children.get(index - 1);
Stat exists = zooKeeper.exists(LOCK_ROOT_PATH + "/" + path, watcher);
// 如果 exists 为空表示占用锁的那个应用已经释放锁了,再次尝试获取锁
if (exists == null) {
attemptLock();
} else {
// 这里开始使用上面创建的 watcher 监听 NodeDeleted 事件
synchronized (watcher) {
// 释放自己获取的锁,直到被唤醒后才重新获取这个锁
watcher.wait();
}
attemptLock();
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}

释放锁

这个没啥好说,就是用完把这个 Node 删除,标识释放了锁

/**
* 释放锁
*/
private void releaseLock() {
try {
// 删除临时有序节点
zooKeeper.delete(this.lockPath, -1);
zooKeeper.close();
System.out.println("锁已释放:" + lockPath);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}

获取锁

把上面的两个方法封装一下

/**
* 获取锁
*/
public void acquireLock() {
//创建锁节点
createLock();
//尝试获取锁
attemptLock();
}

编写测试类

/**
* 测试
*/
public static void main(String[] args) {
MyLock myLock = new MyLock();
// 获取锁
myLock.acquireLock();
sall();
// 释放锁
myLock.releaseLock();
}

public static void sall() {
System.out.println("售票开始");
// 线程休眠,模拟现实中耗时的操作
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("售票结束");
}

开三个窗口

可以观察到,它们会按序争抢这个锁